use super::{MReadIdx, MWriteIdx, Notify, Queue, SReadIdx, SWriteIdx, Token, TokenFactory};
use crate::{prelude::*, Error};
use core::alloc::Layout;
use core::mem::MaybeUninit;
use core::ptr;
use core::slice;
use core::time::Duration;
use hipool::{Allocator, Arc, PoolAlloc};

pub trait Reader {
    type Item;
    fn wait_read(&self, timeout: Option<Duration>) -> Option<Duration>;
    fn read(&self, token: &Token) -> Option<Self::Item>;
    fn read_slice(&self, datas: &mut [MaybeUninit<Self::Item>], token: &Token) -> Option<usize>;
    fn read_fd_event(&self) -> (i32, bool);
    fn qlen(&self) -> usize;
}

pub trait Writer {
    type Item;
    fn wait_write(&self, timeout: Option<Duration>) -> Option<Duration>;
    fn write(&self, val: Self::Item, token: &Token) -> Result<(), Self::Item>;
    fn write_slice(&self, datas: &[MaybeUninit<Self::Item>], token: &Token) -> Option<usize>;
    fn write_fd_event(&self) -> (i32, bool);
    fn qlen(&self) -> usize;
}

pub struct Pipe<'a, T, W, R, N>
where
    W: Default,
    R: Default,
    N: Notify,
{
    queue: Queue<'a, T>,
    widx: W,
    ridx: R,
    notify: N,
}

impl<T, W, R, N> Pipe<'static, T, W, R, N>
where
    W: Default,
    R: Default,
    N: Notify,
{
    pub fn new(len: usize) -> Result<Arc<'static, Self>, Error> {
        Self::new_in(PoolAlloc, len)
    }
}

impl<'a, T, W, R, N> Pipe<'a, T, W, R, N>
where
    W: Default,
    R: Default,
    N: Notify,
{
    pub fn new_in<A>(alloc: A, len: usize) -> Result<Arc<'a, Self, A>, Error>
    where
        A: Allocator + Clone,
    {
        let len = Self::queue_size(len);
        let layout = Layout::array::<T>(len).map_err(|_| Error::new(EINVAL))?;
        Arc::new_with_buf_in(alloc, layout, |uninit: &mut MaybeUninit<Self>, buf| {
            let buf = buf.cast::<MaybeUninit<T>>().as_ptr();
            let slice = unsafe { slice::from_raw_parts_mut(buf, len) };
            unsafe { Self::init(uninit.as_mut_ptr(), slice) };
            Ok(())
        })
    }

    fn queue_size(len: usize) -> usize {
        if len == 0 {
            8
        } else {
            get_len(len)
        }
    }

    pub(crate) unsafe fn init(this: *mut Self, buf: &'a mut [MaybeUninit<T>]) {
        ptr::addr_of_mut!((*this).queue).write(Queue::new(buf));
        ptr::addr_of_mut!((*this).widx).write(W::default());
        ptr::addr_of_mut!((*this).ridx).write(R::default());
        ptr::addr_of_mut!((*this).notify).write(N::default());
    }
}

impl<'a, T, W, N> Reader for Pipe<'a, T, W, MReadIdx, N>
where
    W: Default,
    N: Notify,
{
    type Item = T;
    fn wait_read(&self, timeout: Option<Duration>) -> Option<Duration> {
        self.notify.wait_read(timeout)
    }

    fn read(&self, token: &Token) -> Option<T> {
        self.queue.mc_pop_one(&self.ridx, &self.notify, token)
    }

    fn read_slice(&self, datas: &mut [MaybeUninit<T>], token: &Token) -> Option<usize> {
        self.queue
            .mc_pop_slice(&self.ridx, datas, &self.notify, token)
    }

    fn read_fd_event(&self) -> (i32, bool) {
        self.notify.read_fd_event()
    }

    fn qlen(&self) -> usize {
        self.queue.queue.len()
    }
}

impl<'a, T, W, N> Reader for Pipe<'a, T, W, SReadIdx, N>
where
    W: Default,
    N: Notify,
{
    type Item = T;
    fn wait_read(&self, timeout: Option<Duration>) -> Option<Duration> {
        self.notify.wait_read(timeout)
    }

    fn read(&self, _token: &Token) -> Option<T> {
        self.queue.sc_pop_one(&self.ridx, &self.notify)
    }

    fn read_slice(&self, datas: &mut [MaybeUninit<T>], _token: &Token) -> Option<usize> {
        self.queue.sc_pop_slice(&self.ridx, datas, &self.notify)
    }

    fn read_fd_event(&self) -> (i32, bool) {
        self.notify.read_fd_event()
    }

    fn qlen(&self) -> usize {
        self.queue.queue.len()
    }
}

impl<'a, T, R, N> Writer for Pipe<'a, T, SWriteIdx, R, N>
where
    R: Default,
    N: Notify,
{
    type Item = T;
    fn wait_write(&self, timeout: Option<Duration>) -> Option<Duration> {
        self.notify.wait_write(timeout)
    }

    fn write(&self, val: T, _token: &Token) -> Result<(), T> {
        self.queue.sp_push_one(&self.widx, val, &self.notify)
    }

    fn write_slice(&self, datas: &[MaybeUninit<T>], _token: &Token) -> Option<usize>
    {
        self.queue.sp_push_slice(&self.widx, datas, &self.notify)
    }

    fn write_fd_event(&self) -> (i32, bool) {
        self.notify.write_fd_event()
    }

    fn qlen(&self) -> usize {
        self.queue.queue.len()
    }
}

impl<'a, T, R, N> Writer for Pipe<'a, T, MWriteIdx, R, N>
where
    R: Default,
    N: Notify,
{
    type Item = T;
    fn wait_write(&self, timeout: Option<Duration>) -> Option<Duration> {
        self.notify.wait_write(timeout)
    }

    fn write(&self, val: T, token: &Token) -> Result<(), T> {
        self.queue.mp_push_one(&self.widx, val, &self.notify, token)
    }

    fn write_slice(&self, datas: &[MaybeUninit<T>], token: &Token) -> Option<usize>
    {
        self.queue
            .mp_push_slice(&self.widx, datas, &self.notify, token)
    }

    fn write_fd_event(&self) -> (i32, bool) {
        self.notify.write_fd_event()
    }

    fn qlen(&self) -> usize {
        self.queue.queue.len()
    }
}

#[repr(C)]
pub struct Receiver<'a, T, K, U, A>
where
    U: Reader<Item = T>,
    A: Allocator,
    K: TokenFactory,
{
    channel: Arc<'a, U, A>,
    token: K,
}

unsafe impl<T, K, U, A> Send for Receiver<'_, T, K, U, A>
where
    U: Reader<Item = T> + Send,
    T: Send,
    A: Allocator,
    K: TokenFactory,
{
}

impl<T, K, U, A> Clone for Receiver<'_, T, K, U, A>
where
    U: Reader<Item = T>,
    A: Allocator,
    K: TokenFactory,
{
    fn clone(&self) -> Self {
        Self::new(self.channel.clone())
    }
}

impl<'a, T, K, U, A> Receiver<'a, T, K, U, A>
where
    U: Reader<Item = T>,
    A: Allocator,
    K: TokenFactory,
{
    pub(crate) fn new(channel: Arc<'a, U, A>) -> Self {
        Self {
            channel,
            token: K::new(),
        }
    }
}

impl<T, K, U, A> Receiver<'_, T, K, U, A>
where
    U: Reader<Item = T>,
    A: Allocator,
    K: TokenFactory,
{
    pub fn recv(&self) -> T {
        loop {
            if let Some(val) = self.channel.read(self.token.get()) {
                return val;
            }
            self.channel.wait_read(None);
        }
    }

    pub fn try_recv(&self) -> Option<T> {
        self.channel.read(self.token.get())
    }

    pub fn recv_slice(&self, datas: &mut [MaybeUninit<T>]) {
        let mut cnt = 0;
        while cnt < datas.len() {
            if let Some(n) = self.channel.read_slice(&mut datas[cnt..], self.token.get()) {
                cnt += n;
            } else {
                self.channel.wait_read(None);
            }
        }
    }

    pub fn try_recv_slice(&self, datas: &mut [MaybeUninit<T>]) -> Option<usize> {
        self.channel.read_slice(datas, self.token.get())
    }

    pub fn recv_timeout(&self, mut timeout: Duration) -> Option<T> {
        loop {
            match self.try_recv() {
                Some(val) => return Some(val),
                _ => {
                    if timeout == Duration::default() {
                        return None;
                    }
                    timeout = self.channel.wait_read(Some(timeout)).unwrap();
                }
            }
        }
    }

    pub fn recv_slice_timeout(
        &self,
        datas: &mut [MaybeUninit<T>],
        mut timeout: Duration,
    ) -> Option<usize> {
        loop {
            match self.try_recv_slice(datas) {
                Some(val) => return Some(val),
                _ => {
                    if timeout == Duration::default() {
                        return None;
                    }
                    timeout = self.channel.wait_read(Some(timeout)).unwrap();
                }
            }
        }
    }

    pub fn iter(&self) -> Iter<'_, T, K, U, A> {
        Iter::new(self)
    }

    pub fn block_iter(&self) -> TryIter<'_, T, K, U, A> {
        TryIter::new(self)
    }

    pub fn read_fd_event(&self) -> (i32, bool) {
        self.channel.read_fd_event()
    }

    pub fn qlen(&self) -> usize {
        self.channel.qlen()
    }
}

pub struct Iter<'a, T, K, U, A>
where
    U: Reader<Item = T>,
    A: Allocator,
    K: TokenFactory,
{
    receiver: &'a Receiver<'a, T, K, U, A>,
}

impl<'a, T, K, U, A> Iter<'a, T, K, U, A>
where
    U: Reader<Item = T>,
    A: Allocator,
    K: TokenFactory,
{
    fn new(receiver: &'a Receiver<'a, T, K, U, A>) -> Self {
        Self { receiver }
    }
}

impl<'a, T, K, U, A> Iterator for Iter<'a, T, K, U, A>
where
    U: Reader<Item = T>,
    A: Allocator,
    K: TokenFactory,
{
    type Item = T;
    fn next(&mut self) -> Option<Self::Item> {
        Some(self.receiver.recv())
    }
}

pub struct TryIter<'a, T, K, U, A>
where
    U: Reader<Item = T>,
    A: Allocator,
    K: TokenFactory,
{
    receiver: &'a Receiver<'a, T, K, U, A>,
}

impl<'a, T, K, U, A> TryIter<'a, T, K, U, A>
where
    U: Reader<Item = T>,
    A: Allocator,
    K: TokenFactory,
{
    fn new(receiver: &'a Receiver<'a, T, K, U, A>) -> Self {
        Self { receiver }
    }
}

impl<'a, T, K, U, A> Iterator for TryIter<'a, T, K, U, A>
where
    U: Reader<Item = T>,
    A: Allocator,
    K: TokenFactory,
{
    type Item = T;
    fn next(&mut self) -> Option<Self::Item> {
        self.receiver.try_recv()
    }
}

pub struct Sender<'a, T, K, U, A>
where
    U: Writer<Item = T>,
    A: Allocator,
    K: TokenFactory,
{
    channel: Arc<'a, U, A>,
    token: K,
}

unsafe impl<T, K, U, A> Send for Sender<'_, T, K, U, A>
where
    U: Writer<Item = T> + Send,
    T: Send,
    A: Allocator,
    K: TokenFactory,
{
}

impl<'a, T, K, U, A> Clone for Sender<'a, T, K, U, A>
where
    U: Writer<Item = T>,
    A: Allocator,
    K: TokenFactory,
{
    fn clone(&self) -> Self {
        Self::new(self.channel.clone())
    }
}

impl<'a, T, K, U, A> Sender<'a, T, K, U, A>
where
    U: Writer<Item = T>,
    A: Allocator,
    K: TokenFactory,
{
    pub(crate) fn new(channel: Arc<'a, U, A>) -> Self {
        Self {
            channel,
            token: K::new(),
        }
    }
}

impl<T, K, U, A> Sender<'_, T, K, U, A>
where
    U: Writer<Item = T>,
    A: Allocator,
    K: TokenFactory,
{
    pub fn send(&self, mut val: T) {
        while let Err(err) = self.channel.write(val, self.token.get()) {
            self.channel.wait_write(None);
            val = err;
        }
    }

    pub fn try_send(&self, val: T) -> Result<(), T> {
        self.channel.write(val, self.token.get())
    }

    pub fn send_slice(&self, datas: &[MaybeUninit<T>])
    {
        let mut cnt = 0;
        while cnt < datas.len() {
            if let Some(n) = self.channel.write_slice(&datas[cnt..], self.token.get()) {
                cnt += n;
            } else {
                self.channel.wait_write(None);
            }
        }
    }

    pub fn try_send_slice(&self, datas: &[MaybeUninit<T>]) -> Option<usize>
    {
        self.channel.write_slice(datas, self.token.get())
    }

    pub fn send_timeout(&self, mut val: T, mut timeout: Duration) -> Result<(), T> {
        loop {
            match self.try_send(val) {
                Ok(()) => return Ok(()),
                Err(ret) => {
                    if timeout == Duration::default() {
                        return Err(ret);
                    }
                    val = ret;
                    timeout = self.channel.wait_write(Some(timeout)).unwrap();
                }
            }
        }
    }

    pub fn send_slice_timeout(&self, datas: &[MaybeUninit<T>], mut timeout: Duration) -> Option<usize>
    {
        loop {
            match self.try_send_slice(datas) {
                Some(val) => return Some(val),
                _ => {
                    if timeout == Duration::default() {
                        return None;
                    }
                    timeout = self.channel.wait_write(Some(timeout)).unwrap();
                }
            }
        }
    }

    pub fn write_fd_event(&self) -> (i32, bool) {
        self.channel.write_fd_event()
    }

    pub fn qlen(&self) -> usize {
        self.channel.qlen()
    }
}

fn get_len(mut len: usize) -> usize {
    if (len & (len - 1)) == 0 {
        return len;
    }

    let mut old = len;
    let mut shift = 1;
    loop {
        len |= len >> shift;
        if old == len {
            return len + 1;
        }
        shift <<= 1;
        old = len;
    }
}
